限速神器RateLimiter源码解析
Tech导读
在软件系统中,面对高并发的场景,经常需要通过限流来降低系统压力、保护系统不被压垮;另外在交易处理的场景中,也经常因下游要求或其他原因需控制处理速率。RateLimiter是谷歌开源的一款轻巧限流限速组件,简单实用,设计精妙,本文结合示例对其源码进行了相关分析解读,包括代码层级、处理流程、数据流转、计算逻辑等, 希望能够帮助大家了解和使用。
导读
在软件系统中,面对高并发的场景,经常需要通过限流来降低系统压力、保护系统不被压垮;另外在交易处理的场景中,也经常因下游要求或其他原因需控制处理速率。RateLimiter是谷歌开源的一款轻巧限流限速组件,简单实用,设计精妙,本文结合示例对其源码进行了相关分析解读,包括代码层级、处理流程、数据流转、计算逻辑等, 希望能够帮助大家了解和使用。01 目录指引
在今年的敏捷团队建设中,我通过Suite执行器实现了一键自动化单元测试。Juint除了Suite执行器还有哪些执行器呢?由此我的Runner探索之旅开始了!
02
限流场景
理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。软件系统中一般有两种场景会用到限流:
软件系统中一般有两种场景会用到限流:
场景一、高并发的用户端场景。尤其是C端系统,经常面对海量用户请求,如不做限流,遇到瞬间高并发的场景,则可能压垮系统。 场景二、内部交易处理场景。如某类交易任务处理时有速率要求,再如上下游调用时下游对上游有速率要求。
无论哪种场景,都需要对请求处理的速率进行限制,或者每单个请求处理的速率固定(模式一),或者每批次请求的处理速率固定(模式二),见下图:
常用的限流算法有如下几种:
算法一、信号量算法。维护最大的并发请求数(如连接数),当并发请求数达到阈值时报错或等待,如线程池。 算法二、漏桶算法。模拟一个按固定速率漏出的桶,当流入的请求量大于桶的容量时溢出。 算法三、令牌桶算法。以固定速率向桶内发放令牌,请求处理时,先从桶里获取令牌,只服务有令牌的请求。
03
使用介绍
理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。RateLimiter使用时只需引入guava jar便可,最新的版本是31.1-jre, 本文介绍的源码也是此版本。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
示例一、有一系列任务列表要提交执行,控制提交速率不超过每秒2个。
final RateLimiter rateLimiter = RateLimiter.create(2.0); // 创建一个每秒2个许可的RateLimiter对象.
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // 此处可能有等待
executor.execute(task);
}
}
final RateLimiter rateLimiter = RateLimiter.create(5000.0); // 创建一个每秒5k个许可的RateLimiter对象
void submitPacket(byte[] packet) {
rateLimiter.acquire(packet.length);
networkService.send(packet);
}
可以看出RateLimiter的使用非常简单,只需要构造限速器,调用获取许可方法便可,不需要释放许可。
04 算法介绍
理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目在介绍之前,先说一下RateLimiter中的几个名词:
在介绍之前,先说一下RateLimiter中的几个名词:
•许可(permit):代表一个令牌,获取到许可的请求才能放行。
•资源利用不足(underunilization): 许可的发放一般是匀速的,但请求未必是匀速的,有时会有无请求(资源利用不足)的场景,令牌桶会有贮存机制。
•贮存许可(storedPermit): 令牌桶支持对空闲资源进行许可贮存,许可请求时优先使用贮存许可。
•新鲜许可(freshPermit): 当贮存许可为空时,采用透支方式,下发新鲜许可,同时设置下次许可生效时间为本次新鲜许可的结束时间。
如下为一个许可发放示例,矩形代表整个令牌桶,许可产生速度为1个/秒,令牌桶里有一个贮存桶,容量为2。
05 代码结构和主体流程
理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。
5.1 代码结构
5.2 主体流程
5.3 SmoothBursty算法
5.3.1 限速器创建
public static RateLimiter create(double permitsPerSecond) {
// permitsPerSecond指每秒允许的许可数. 该方法调用了下面的方法
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
// 创建SmoothBursty(固定贮存1s的贮存许可), 然后设置速率
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
1、SmoothBursty的构造方法相对简单:
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
2、rateLimiter.setRate的定义在父类RateLimiter中:
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
该方法使用synchronized(mutex())方法对互斥锁进行同步,以保证多线程调用的安全,然后调用子类的doSetRate方法。 第二个参数nowMicros传的值是调用了stopwatch的方法,将限速器创建的时间定义为0,然后计算了当前时间和创建时间的时间差,因此采用的是相对时间。
// Can't be initialized in the constructor because mocks don't call the constructor.
// 从上行注释可看出,这是因为mock才用了懒加载, 实际上即时加载代码更简洁
@CheckForNull private volatile Object mutexDoNotUseDirectly;
// 双重检查锁的懒加载模式
private Object mutex() {
Object mutex = mutexDoNotUseDirectly;
if (mutex == null) {
synchronized (this) {
mutex = mutexDoNotUseDirectly;
if (mutex == null) {
mutexDoNotUseDirectly = mutex = new Object();
}
}
}
return mutex;
}
2-2 doSetRate方法的主体实现在SmoothRateLimiter类中:
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 同步贮存许可和时间
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
2-2-1 resync方法用于基于当前时间刷新计算最新的storedPermis和nextFreeTicketMicros.
/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
该方法从现实场景上讲,代表的是随着时间的流逝,贮存许可不断增加,但从技术实现的角度,并不是真正的持续刷新,而是仅在需要时调用刷新。该方法如果当前时间小于等于下次许可时间,则贮存许可数量和下次许可时间不需要刷新;否则通过(当前时间-下次许可时间)/贮存许可的发放间隔计算出的值域最大贮存数量取小,则为已贮存的许可数量,需要注意的是贮存许可数量是double类型的。
5.3.2 限速器使用
限速器常用的方法主要有accquire和tryAccquire。
先说一下accquire方法, 共有两个共有方法,一个是无参的,每次获取1个许可,再一个是整数参数的,每次调用获取多个许可。
// 获取1个许可
public double acquire() {
return acquire(1);
}
// 获取多个许可
public double acquire(int permits) {
// 留出permits个许可,得到需要sleep的微秒数.
long microsToWait = reserve(permits);
// 该方法如果小于等于零则直接返回,否则sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回休眠的秒数.
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
// 预留出permits个许可
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
// 预留出permits个需求,得到需要等待的时间
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
abstract long reserveEarliestAvailable(int permits, long nowMicros);
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 刷新贮存许可和下个令牌时间
resync(nowMicros);
// 返回值为当前的下次空闲时间
long returnValue = nextFreeTicketMicros;
// 要消耗的贮存数量为需要的贮存数量
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// 新鲜许可数=需要的许可数-使用的贮存许可
double freshPermits = requiredPermits - storedPermitsToSpend;
// 等待时间=贮存许可等待时间(实现方决定)+新鲜许可等待时间(数量*固定速率)
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 透支后的下次许可可用时间=当前时间(nextFreeTicketMicros)+等待时间(waitMicros)
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 贮存许可数量减少
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
public boolean tryAcquire(Duration timeout) {
return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
}
public boolean tryAcquire(long timeout, TimeUnit unit) {
return tryAcquire(1, timeout, unit);
}
public boolean tryAcquire(int permits) {
return tryAcquire(permits, 0, MICROSECONDS);
}
public boolean tryAcquire() {
return tryAcquire(1, 0, MICROSECONDS);
}
public boolean tryAcquire(int permits, Duration timeout) {
return tryAcquire(permits, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
// 判断超时微秒数是否可等到下个许可时间
if (!canAcquire(nowMicros, timeoutMicros)) {
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
// 休眠等待
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
// 下次许可时间-超时时间<=当前时间
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
5.4 SmoothWarmingUp算法
SmoothWarmingUp算法的主体处理流程同SmoothBurstry算法,主要在贮存许可时间计算上的两个方法进行了新实现,该算法不像SmoothBurstry算法那么直观好理解,需要先了解算法逻辑,再看源码。
5.4.1算法说明
该算法在源码注释中已经描述的比较清晰了,主要思想是限流器的初始贮存许可数量便是最大贮存许可值, 贮存许可执行时按一定算法由慢到快的产生,直至设定的固定速率,以此来达到预热过程。该算法涉及到一些数学知识,如果不是很感兴趣,则了解其主要思想便可。下面详细说一下该算法。
说到该算法前,再回头看一下SmoothRateLimiter的贮存许可,贮存许可有当前数量和最大数量,另外还有两个算法逻辑,一个是贮存许可生产的速率控制,再一个是贮存许可消费速率的控制,在Bursty算法中,生产的速率同设定的固定速率,而消费的速率为无穷大(立刻消费,不占用时间);在WarmingUp算法中,需对照下图进行分析:
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
在具体使用中,一个是生产的速率,固定为预热时间/最大许可数,源码如下:
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
5.4.2源码分析
了解了以上算法后,再看下面的源码就相对简单了。
static final class SmoothWarmingUp extends SmoothRateLimiter {
// 预热时间
private final long warmupPeriodMicros;
//斜率
private double slope;
//阈值许可
private double thresholdPermits;
//冷却因子
private double coldFactor;
SmoothWarmingUp(
SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
super(stopwatch);
this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
this.coldFactor = coldFactor;
}
// 参数初始化
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
// 有storedPermits个贮存许可,要使用permitsToTake个时的等待时间计算
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
// 许可耗时=固定速率+许可值*斜率
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
// 冷却间隔固定为预热时间/最大许可数.
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
}
06
思考总结
理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。
6.1 sleep说明和相对时间
RateLimiter内部使用类StopWatch进行了一个相对时间的度量,RateLimiter创建时,时间为0,然后向后累计,sleep时不受interrupt异常影响。
6.2 double浮点数
RateLimiter暴露的API的许可数量入参为整数类型,但内部计算时实际是浮点double类型,支持小数许可数量,一方面浮点存在丢失精度,另一方面也不便于理解;是否可以使用整数值得考虑。
6.3 只支持单机
RateLimiter的这几种算法只支持单机限流,如要支持集群限流,一种方式是先根据负载均衡的权重计算出单机的限速值,再进行单节点限速;另一种方式是参考该组件使用redis等中心化数量管理的中间件,但性能和稳定性会降低一些。
6.4 扩展性
RateLimiter提供了有限的扩展能力,自带的SmoothBursty和SmoothWarmingUp类不是公开类,不能直接创建或调整参数,如关闭贮存功能或调整预热系数等。这种场景需要继承SmoothRateLimiter进行重写,贮存许可的生产和消费算法是容易变化和重写的点,将整个源码拷贝出来进行二次修改也是一种方案。
基于CAS的单点登录实践之路
借降本增效之名,探索开闭原则架构设计
求分享
求点赞
求在看